Add fast raw memory search path#194
Conversation
44c885e to
b7c56e7
Compare
|
Rebased this replacement PR on the latest Latest local validation after rebase:
|
There was a problem hiding this comment.
Code Review
This pull request enhances the memory search endpoint by introducing concurrent search execution via a retrieval plan, adding support for synthesized answers, and expanding search domains to include snippets and code. It also implements latency tracking with percentile statistics and a TTL-based cache for profile catalogs. Feedback focuses on ensuring thread-safety for the new caches, improving type hints in helper functions, adopting more idiomatic awaitable detection using inspect.isawaitable, and refining the cache eviction strategy to follow an LRU policy.
I am having trouble creating individual review comments. Click here to see my feedback.
src/pipelines/retrieval.py (137-140)
The RetrievalPipeline cache dictionaries are accessed and modified in an asynchronous context but lack thread-safety guards. While asyncio tasks on a single thread are safe if they don't yield, these methods are synchronous and could be called from multiple threads if the server is configured with a thread pool. Adding a threading.Lock would prevent potential race conditions during cache pruning and updates.
src/api/routes/memory.py (143)
The _timed helper function is missing type hints for its arguments, which reduces code clarity and makes it harder for static analysis tools to verify usage. Adding hints for func, args, and kwargs would improve maintainability.
async def _timed(mode: str, func: Callable, *args: Any, threaded: bool = False, **kwargs: Any):
src/api/routes/memory.py (149-150)
Checking for __await__ is a bit fragile for detecting awaitables. It is more idiomatic and robust to use inspect.isawaitable from the standard library to determine if the result needs to be awaited.
if inspect.isawaitable(result):
result = await result
src/pipelines/retrieval.py (507)
Updating the cache entry by assignment in a standard Python dictionary does not move the key to the end of the insertion order. This results in a FIFO (First-In-First-Out) eviction policy rather than LRU (Least Recently Used). For an active user cache, re-inserting the key or using collections.OrderedDict.move_to_end is preferred to keep frequently accessed items in the cache.
self._profile_catalog_cache.pop(user_id)
self._profile_catalog_cache[user_id] = (now, cached[1], cached[2])
|
Pushed follow-up commit
Validation:
|
|
| Filename | Overview |
|---|---|
| src/api/routes/memory.py | Refactors search_memory to run all domain searches concurrently via asyncio.gather, adds _timed helper for per-domain latency tracking, and adds snippet/code search paths; the raw profile search bypasses the new pipeline cache (causing double DB queries when answer=True). |
| src/pipelines/retrieval.py | Adds TTL+LRU cache for _fetch_profile_catalog and a locked plan cache via raw_retrieval_plan; the plan cache key includes answer which never changes the returned value (wasted entries), and the profile cache write lacks a double-check that risks stampede overwrites. |
| src/api/schemas.py | Extends SearchRequest with answer flag and deduplicated domains, adds SearchLatencySummary model, and expands SearchResponse with answer/confidence/latency fields; no issues found. |
| tests/api/test_dependencies_and_routes.py | Adds a complete FakeRetrievalPipeline with all five domain search stubs and a new integration test covering raw hits, per-domain latency, and optional answer synthesis; coverage looks solid for the happy path. |
Sequence Diagram
sequenceDiagram
participant Client
participant Route as search_memory (route)
participant Plan as raw_retrieval_plan (cache)
participant Gather as asyncio.gather
participant VS as vector_store
participant Neo4j as neo4j
participant Pipeline as pipeline.run (answer path)
Client->>Route: POST /v1/memory/search
Route->>Plan: raw_retrieval_plan(domains, answer)
Plan-->>Route: normalized domain tuple
Route->>Gather: gather(_timed tasks for all domains)
par profile (thread)
Gather->>VS: search_by_metadata (profile)
VS-->>Gather: raw results
and temporal (thread)
Gather->>Neo4j: search_events_by_embedding
Neo4j-->>Gather: raw events
and summary/snippet/code (async)
Gather->>VS: search_by_text (summary / code)
Gather->>VS: _search_snippet (sandboxed ns)
VS-->>Gather: raw results
end
Gather-->>Route: all domain results + latency
opt "answer=true"
Route->>Pipeline: pipeline.run(query, user_id, top_k)
Note over Pipeline,VS: _fetch_profile_catalog checks cache,\nthen queries VS again (cache miss on first call)
Pipeline->>VS: search_by_metadata (profile) duplicate fetch
Pipeline-->>Route: answer + sources + confidence
end
Route-->>Client: SearchResponse(results, latency_ms, latency_stats, [answer])
Comments Outside Diff (1)
-
src/api/routes/memory.py, line 1041-1049 (link)Raw profile search bypasses the new cache on every request
_search_profilecallspipeline.vector_store.search_by_metadatadirectly, completely bypassing_fetch_profile_catalog's TTL cache that this PR added. Whenanswer=Trueand"profile"is in the requested domains, both the raw concurrent task and the subsequentpipeline.runcall independently hit the vector store with the identical query (filters={"user_id": user_id, "domain": "profile"}, top_k=100). That's two redundant round-trips per request, every time — the cache never benefits the raw path. The fix would be to have_search_profilecallpipeline._fetch_profile_catalog(user_id)and extract results from the cached return value instead of re-querying.
Reviews (1): Last reviewed commit: "Address cache review feedback" | Re-trigger Greptile
| def raw_retrieval_plan(self, domains: List[str], answer: bool = False) -> tuple[str, ...]: | ||
| """Return a cached deterministic raw-search plan for the requested domains.""" | ||
| ordered_allowed = ("profile", "temporal", "summary", "snippet", "code") | ||
| normalized = tuple(d for d in ordered_allowed if d in set(domains)) | ||
| key = (normalized, answer) | ||
| with self._raw_retrieval_plan_cache_lock: | ||
| if key not in self._raw_retrieval_plan_cache: | ||
| self._raw_retrieval_plan_cache[key] = normalized | ||
| return self._raw_retrieval_plan_cache[key] |
There was a problem hiding this comment.
answer is part of the cache key but is never used to compute the returned tuple — both (normalized, True) and (normalized, False) store and return the same normalized value. This silently doubles the number of entries in the plan cache for any domain set that is queried with both values, and the parameter looks meaningful when it isn't. Either use answer to actually vary the plan, or drop it from the key entirely.
| def raw_retrieval_plan(self, domains: List[str], answer: bool = False) -> tuple[str, ...]: | |
| """Return a cached deterministic raw-search plan for the requested domains.""" | |
| ordered_allowed = ("profile", "temporal", "summary", "snippet", "code") | |
| normalized = tuple(d for d in ordered_allowed if d in set(domains)) | |
| key = (normalized, answer) | |
| with self._raw_retrieval_plan_cache_lock: | |
| if key not in self._raw_retrieval_plan_cache: | |
| self._raw_retrieval_plan_cache[key] = normalized | |
| return self._raw_retrieval_plan_cache[key] | |
| def raw_retrieval_plan(self, domains: List[str], answer: bool = False) -> tuple[str, ...]: | |
| """Return a cached deterministic raw-search plan for the requested domains.""" | |
| ordered_allowed = ("profile", "temporal", "summary", "snippet", "code") | |
| normalized = tuple(d for d in ordered_allowed if d in set(domains)) | |
| with self._raw_retrieval_plan_cache_lock: | |
| if normalized not in self._raw_retrieval_plan_cache: | |
| self._raw_retrieval_plan_cache[normalized] = normalized | |
| return self._raw_retrieval_plan_cache[normalized] |
| with self._profile_catalog_cache_lock: | ||
| self._prune_profile_catalog_cache(now) | ||
| self._profile_catalog_cache[user_id] = (now, catalog, results) |
There was a problem hiding this comment.
Missing double-check before cache write
When two concurrent calls for the same user_id both miss the cache in the first with block, they both release the lock and both execute the expensive search_by_metadata query. When the second with block is reached, there's no re-check of whether another thread already populated the entry. The second writer overwrites the first with a stale now timestamp (captured at the very start of the function). A simple check like if user_id not in self._profile_catalog_cache: before assigning would avoid the redundant overwrite and protect against cache stampede on concurrent first-hit requests for the same user.
Replaces #189, which was closed accidentally and could not be reopened.
Closes #163
Summary
/v1/memory/searchinto a low-latency raw search path across profile, temporal, summary, snippet, and code domainsanswer=trueFollow-up after code-assist review
asyncio.gatherTests
pytest tests/api/test_dependencies_and_routes.py tests/unit/test_schemas.py -qpython3 -m ruff check src/api/schemas.py src/api/routes/memory.py src/pipelines/retrieval.py tests/api/test_dependencies_and_routes.py